#include "RpcChannel.h"
#include "RpcHeader.pb.h"
#include "ZooKeeperClient.h"
#include "RpcApplication.h"
#include "RpcController.h"
#include <memory>
#include <errno.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include "RpcLogger.h"

namespace EasyRpc{
    std::mutex g_data_mutex;  // 全局互斥锁，用于保护共享数据的线程安全

    // RPC调用的核心方法，负责将客户端的请求序列化并发送到服务端，同时接收服务端的响应
    void RpcChannel::CallMethod(const ::google::protobuf::MethodDescriptor *method,
                                ::google::protobuf::RpcController *controller,
                                const ::google::protobuf::Message *request,
                                ::google::protobuf::Message *response,
                                ::google::protobuf::Closure *done)
    {
        if (-1 == m_clientfd) {  // 如果客户端socket未初始化
            // 获取服务对象名和方法名
            const google::protobuf::ServiceDescriptor *sd = method->service();
            service_name = sd->name();  // 服务名
            method_name = method->name();  // 方法名

            // 客户端需要查询ZooKeeper，找到提供该服务的服务器地址
            ZooKeeperClient zkCli;
            zkCli.Start();  // 连接ZooKeeper服务器
            std::string host_data = QueryServiceHost(&zkCli, service_name, method_name, m_idx);  // 查询服务地址
            std::string m_ip = host_data.substr(0, m_idx);  // 从查询结果中提取IP地址
            std::cout << "ip: " << m_ip << std::endl;
            int m_port = atoi(host_data.substr(m_idx + 1, host_data.size() - m_idx).c_str());  // 从查询结果中提取端口号
            std::cout << "port: " << m_port << std::endl;
    
            // 尝试连接服务器
            auto rt = newConnect(m_ip.c_str(), m_port);
            if (!rt) {
                RpcLogger::Error("connect Rpcserver error");
                return;
            } else {
                RpcLogger::Info("connect Rpcserver success");
            }
        }  // endif
    
        // 将请求参数序列化为字符串，并计算其长度
        uint32_t args_size{};
        std::string args_str;
        if (request->SerializeToString(&args_str)) {  // 序列化请求参数
            args_size = args_str.size();  // 获取序列化后的长度
        } else {
            controller->SetFailed("serialize request fail");  // 序列化失败，设置错误信息
            return;
        }
    
        // 定义RPC请求的头部信息
        Header::RpcHeader rpc_header;
        // LOG(INFO) << "service_name : "<<service_name;  // 连接成功，记录日志
        // LOG(INFO) << "method_name : "<<method_name;  // 连接成功，记录日志
        // LOG(INFO) << "args_size : "<<args_size;  // 连接成功，记录日志
        rpc_header.set_service_name(service_name);  // 设置服务名
        rpc_header.set_method_name(method_name);  // 设置方法名
        rpc_header.set_args_size(args_size);  // 设置参数长度
    
        // 将RPC头部信息序列化为字符串，并计算其长度
        uint32_t header_size = 0;
        std::string rpc_header_str;
        if (rpc_header.SerializeToString(&rpc_header_str)) {  // 序列化头部信息
            header_size = rpc_header_str.size();  // 获取序列化后的长度
        } else {
            controller->SetFailed("serialize rpc header error!");  // 序列化失败，设置错误信息
            return;
        }
        // LOG(INFO) << "header_size : "<<header_size;  // 连接成功，记录日志
        // LOG(INFO) << "rpc_header_str : "<<rpc_header_str;  // 连接成功，记录日志
        // 将头部长度和头部信息拼接成完整的RPC请求报文
        std::string send_rpc_str;
        {
            google::protobuf::io::StringOutputStream string_output(&send_rpc_str);
            google::protobuf::io::CodedOutputStream coded_output(&string_output);
            coded_output.WriteVarint32(static_cast<uint32_t>(header_size));  // 写入头部长度
            coded_output.WriteString(rpc_header_str);  // 写入头部信息
        }
        send_rpc_str += args_str;  // 拼接请求参数
    
        // 发送RPC请求到服务器
        if (-1 == send(m_clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0)) {
            close(m_clientfd);  // 发送失败，关闭socket
            char errtxt[512] = {};
            RpcLogger::Error("send error: " + std::string(strerror_r(errno, errtxt, sizeof(errtxt))));
            controller->SetFailed(errtxt);  // 设置错误信息
            return;
        }
    
        // 接收服务器的响应
        char recv_buf[1024] = {0};
        int recv_size = 0;
        if (-1 == (recv_size = recv(m_clientfd, recv_buf, 1024, 0))) {
            char errtxt[512] = {};
            RpcLogger::Error("recv error: " + std::string(strerror_r(errno, errtxt, sizeof(errtxt))));
            // std::cout << "recv error" << strerror_r(errno, errtxt, sizeof(errtxt)) << std::endl;  // 打印错误信息
            controller->SetFailed(errtxt);  // 设置错误信息
            return;
        }
    
        // 将接收到的响应数据反序列化为response对象
        if (!response->ParseFromArray(recv_buf, recv_size)) {
            close(m_clientfd);  // 反序列化失败，关闭socket
            char errtxt[512] = {};
            RpcLogger::Error("parse error: " + std::string(strerror_r(errno, errtxt, sizeof(errtxt))));
            // std::cout << "parse error" << strerror_r(errno, errtxt, sizeof(errtxt)) << std::endl;  // 打印错误信息
            controller->SetFailed(errtxt);  // 设置错误信息
            return;
        }
    
        close(m_clientfd);  // 关闭socket连接
    }
    
    // 创建新的socket连接
    bool RpcChannel::newConnect(const char *ip, uint16_t port) {
        // 创建socket
        int clientfd = socket(AF_INET, SOCK_STREAM, 0);
        if (-1 == clientfd) {
            char errtxt[512] = {0};
            RpcLogger::Error("socket error: " + std::string(strerror_r(errno, errtxt, sizeof(errtxt))));
            return false;
        }
    
        // 设置服务器地址信息
        struct sockaddr_in server_addr;
        server_addr.sin_family = AF_INET;  // IPv4地址族
        server_addr.sin_port = htons(port);  // 端口号
        server_addr.sin_addr.s_addr = inet_addr(ip);  // IP地址
    
        // 尝试连接服务器
        if (-1 == connect(clientfd, (struct sockaddr *)&server_addr, sizeof(server_addr))) {
            close(clientfd);  // 连接失败，关闭socket
            char errtxt[512] = {0};
            RpcLogger::Error("connect error: " + std::string(strerror_r(errno, errtxt, sizeof(errtxt))));
            return false;
        }
    
        m_clientfd = clientfd;  // 保存socket文件描述符
        return true;
    }
    
    // 从ZooKeeper查询服务地址
    std::string RpcChannel::QueryServiceHost(ZooKeeperClient *zkclient, std::string service_name, std::string method_name, int &idx) {
        std::string method_path = "/" + service_name + "/" + method_name;  // 构造ZooKeeper路径
        std::cout << "method_path: " << method_path << std::endl;
    
        std::unique_lock<std::mutex> lock(g_data_mutex);  // 加锁，保证线程安全
        std::string host_data_1 = zkclient->GetData(method_path.c_str());  // 从ZooKeeper获取数据
        lock.unlock();  // 解锁
    
        if (host_data_1 == "") {  // 如果未找到服务地址
            RpcLogger::Error(method_path + " is not exist!");
            return " ";
        }
    
        idx = host_data_1.find(":");  // 查找IP和端口的分隔符
        if (idx == -1) {  // 如果分隔符不存在
            RpcLogger::Error(method_path + " address is invalid!");
            return " ";
        }
    
        return host_data_1;  // 返回服务地址
    }
    
    // 构造函数，支持延迟连接
    RpcChannel::RpcChannel(bool connectNow) : m_clientfd(-1), m_idx(0) {
        if (!connectNow) {  // 如果不需要立即连接
            return;
        }
    
        // 尝试连接服务器，最多重试3次
        auto rt = newConnect(m_ip.c_str(), m_port);
        int count = 3;  // 重试次数
        while (!rt && count--) {
            rt = newConnect(m_ip.c_str(), m_port);
        }
    }
}
